{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Working with TFRecord Datasets\n",
    "\n",
    "1. [Introduction](#Introduction)\n",
    "1. [Prerequisites](#Prerequisites)\n",
    "1. [Converting a dataset from CSV to TFrecords](#Converting-a-dataset-from-CSV-to-TFrecords)\n",
    " 1. [Upload dataset to S3](#Upload-dataset-to-S3)\n",
    "1. [Construct a DNNClassifier](#Construct-a-DNNClassifier)\n",
    "1. [Train a Model](#Train-a-Model)\n",
    "1. [Run Batch Transform](#Run-Batch-Transform)\n",
    " 1. [Build a container for transforming TFRecord input](#Build-a-container-for-transforming-TFRecord-input)\n",
    " 1. [Push container to ECR](#Push-container-to-ECR)\n",
    " 1. [Create a model with an inference pipeline](#Create-a-model-with-an-inference-pipeline)\n",
    " 1. [Run a batch transform job](#Run-a-batch-transform-job)\n",
    " 1. [Inspect batch transform output](#Inspect-batch-transform-output)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Introduction\n",
    "\n",
    "TFRecord is a standard TensorFlow data format. It is a record-oriented binary file format that allows for efficient storage and processing of large datasets. In this notebook, we’ll demonstrate how to take an existing CSV dataset and convert it to TFRecord files. We’ll also build a TensorFlow training script that accepts serialized tf.Example protos (the payload of our TFRecords) as input during training. Then, we'll run a training job using the TFRecord dataset we've generated as input. Finally, we'll demonstrate how to run a batch transform job with an inference pipeline so that we can pass the TFRecord dataset as input."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prerequisites\n",
    "\n",
    "Let's start by specifying:\n",
    "* The S3 bucket and prefixes you'd like to use for training and batch transform data.\n",
    "* The IAM role that will be used for training and batch transform jobs, as well as ECR repository creation and image upload."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "import sagemaker\n",
    "import tensorflow as tf\n",
    "\n",
    "bucket = '<your_bucket_name>'\n",
    "training_prefix = 'training'\n",
    "batch_input_prefix = 'batch_input'\n",
    "batch_output_prefix ='batch_output'\n",
    "\n",
    "sess = sagemaker.Session()\n",
    "role = sagemaker.get_execution_role()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Converting a dataset from CSV to TFRecords\n",
    "\n",
    "First, we'll take an existing CSV dataset (located in `./dataset-csv/`) and convert it to the TFRecords file format:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "csv_root = './dataset-csv/'\n",
    "tfrecord_root = './dataset-tfrecord/'\n",
    "test_csv_file = 'iris_test.csv'\n",
    "train_csv_file = 'iris_train.csv'\n",
    "test_tfrecord_file = 'iris_test.tfrecords'\n",
    "train_tfrecord_file = 'iris_train.tfrecords'\n",
    "\n",
    "def _floatlist_feature(value):\n",
    "    return tf.train.Feature(float_list=tf.train.FloatList(value=[float(value)]))\n",
    "\n",
    "def _int64list_feature(value):\n",
    "    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))\n",
    "\n",
    "# create the tfrecord dataset dir\n",
    "if not os.path.isdir(tfrecord_root):\n",
    "    os.mkdir(tfrecord_root)\n",
    "\n",
    "for input_file, output_file in [(test_csv_file,test_tfrecord_file), (train_csv_file,train_tfrecord_file)]:\n",
    "    # create the output file\n",
    "    open(tfrecord_root + output_file, 'a').close()\n",
    "    with tf.python_io.TFRecordWriter(tfrecord_root + output_file) as writer:\n",
    "        with open(csv_root + input_file,'r') as f:\n",
    "            f.readline() # skip first line\n",
    "            for line in f:\n",
    "                feature = {\n",
    "                    'sepal_length': _floatlist_feature(line.split(',')[0]),\n",
    "                    'sepal_width': _floatlist_feature(line.split(',')[1]),\n",
    "                    'petal_length': _floatlist_feature(line.split(',')[2]),\n",
    "                    'petal_width': _floatlist_feature(line.split(',')[3]),\n",
    "                }\n",
    "                if f == train_csv_file:\n",
    "                    feature['label'] = _int64list_feature(int(line.split(',')[4].rstrip()))\n",
    "                example = tf.train.Example(\n",
    "                    features=tf.train.Features(\n",
    "                        feature=feature\n",
    "                    )\n",
    "                )\n",
    "                writer.write(example.SerializeToString())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Upload dataset to S3\n",
    "\n",
    "Next, we'll upload the TFRecord datasets to S3 so that we can use it in training and batch transform jobs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def upload_to_s3(bucket, key, file):\n",
    "    s3 = boto3.resource('s3')\n",
    "    data = open(file, \"rb\")\n",
    "    s3.Bucket(bucket).put_object(Key=key, Body=data)\n",
    "    \n",
    "upload_to_s3(bucket, training_prefix + '/' + train_tfrecord_file, tfrecord_root + train_tfrecord_file)\n",
    "upload_to_s3(bucket, batch_input_prefix + '/' + test_tfrecord_file, tfrecord_root + test_tfrecord_file)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Construct a DNN Classifier\n",
    "\n",
    "In `./dnn-classifier/train.py` we've defined a neural network classifier using TensorFlow's DNNClassifier. We can take a look at the train script to see how the network and input functions are defined:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!cat ./dnn-classifier/train.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train a Model\n",
    "\n",
    "Next, we'll kick off a training job using the training script defined above."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker.tensorflow import TensorFlow\n",
    "\n",
    "train_data_location = 's3://{}/{}'.format(bucket, training_prefix)\n",
    "instance_type = 'ml.c4.xlarge'\n",
    "\n",
    "estimator = TensorFlow(entry_point='train.py',\n",
    "                       source_dir='dnn-classifier',\n",
    "                       model_dir='/opt/ml/model',\n",
    "                       train_instance_type=instance_type,\n",
    "                       train_instance_count=1,\n",
    "                       role=sagemaker.get_execution_role(), # Passes to the container the AWS role that you are using on this notebook\n",
    "                       framework_version='1.11.0', # Uses TensorFlow 1.11\n",
    "                       py_version='py3',\n",
    "                       script_mode=True)\n",
    "\n",
    "inputs = {'training': train_data_location}\n",
    "\n",
    "estimator.fit(inputs)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run Batch Transform"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Build a container for transforming TFRecord input\n",
    "\n",
    "The SageMaker TensorFlow Serving container uses the TensorFlow ModelServer RESTful API to serve predict requests. In the next step, we'll create a container to transform mini-batch TFRecord payloads into JSON objects that can be forwarded to the TensorFlow serving container. To do this, we've created a simple Python Flask app that does the transformation, the code for this container is available in the `./tfrecord-transformer-container/` directory. First, we'll build the container:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!docker build -t tfrecord-transformer ./tfrecord-transformer-container/"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Push container to ECR\n",
    "\n",
    "Next, we'll push the docker container to an ECR repository in your account. In order to push the container to ECR, the execution role attached to this notebook should have permissions to create a repository, set a repository policy, and upload an image."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "account_id = boto3.client('sts').get_caller_identity().get('Account')\n",
    "region = boto3.session.Session().region_name\n",
    "\n",
    "ecr_repository = 'tfrecord-transformer'\n",
    "tag = ':latest'\n",
    "transformer_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}'.format(account_id, region, ecr_repository + tag)\n",
    "\n",
    "# docker login\n",
    "!$(aws ecr get-login --region us-west-2 --registry-ids $account_id --no-include-email)\n",
    "# create ecr repository\n",
    "!aws ecr create-repository --repository-name $ecr_repository\n",
    "# attach policy allowing sagemaker to pull this image\n",
    "!aws ecr set-repository-policy --repository-name $ecr_repository --policy-text \"$( cat ./tfrecord-transformer-container/ecr_policy.json )\"\n",
    "\n",
    "!docker tag {ecr_repository + tag} $transformer_repository_uri\n",
    "!docker push $transformer_repository_uri"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create a model with an inference pipeline\n",
    "\n",
    "Next, we'll create a SageMaker model with the two containers chained together (TFRecord transformer -> TensorFlow Serving)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker.tensorflow.serving import Model\n",
    "from sagemaker.utils import name_from_base\n",
    "\n",
    "client = boto3.client('sagemaker')\n",
    "\n",
    "model_name = name_from_base('tfrecord-to-tfserving')\n",
    "\n",
    "transform_container = {\n",
    "    \"Image\": transformer_repository_uri\n",
    "}\n",
    "\n",
    "tf_serving_model = Model(model_data=estimator.model_data,\n",
    "                         role=sagemaker.get_execution_role(),\n",
    "                         image=estimator.image_name,\n",
    "                         framework_version=estimator.framework_version,\n",
    "                         sagemaker_session=estimator.sagemaker_session)\n",
    "tf_serving_container = tf_serving_model.prepare_container_def(instance_type)\n",
    "\n",
    "model_params = {\n",
    "    \"ModelName\": model_name,\n",
    "    \"Containers\": [\n",
    "        transform_container,\n",
    "        tf_serving_container\n",
    "    ],\n",
    "    \"ExecutionRoleArn\": sagemaker.get_execution_role()\n",
    "}\n",
    "\n",
    "client.create_model(**model_params)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Run a batch transform job\n",
    "\n",
    "Next, we'll run a batch transform job using our inference pipeline model. We'll specify `SplitType=TFRecord` and `BatchStrategy=MultiRecord` to specify that our dataset will be split by TFRecord boundaries, and multiple records will be batched in a single request up to the `MaxPayloadInMB=1` limit."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "input_data_path = 's3://{}/{}'.format(bucket, batch_input_prefix)\n",
    "output_data_path = 's3://{}/{}'.format(bucket, batch_output_prefix)\n",
    "\n",
    "transformer = sagemaker.transformer.Transformer(\n",
    "    model_name = model_name,\n",
    "    instance_count = 1,\n",
    "    instance_type = instance_type,\n",
    "    strategy = 'MultiRecord',\n",
    "    max_payload = 1,\n",
    "    output_path = output_data_path,\n",
    "    assemble_with= 'Line',\n",
    "    base_transform_job_name='tfrecord-transform',\n",
    "    sagemaker_session=sess,\n",
    ")\n",
    "transformer.transform(data = input_data_path,\n",
    "                      split_type = 'TFRecord')\n",
    "transformer.wait()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Inspect batch transform output\n",
    "\n",
    "Finally, we can inspect the output files of our batch transform job to see the predictions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "output_uri = transformer.output_path + '/' + test_tfrecord_file + '.out'\n",
    "!aws s3 cp $output_uri -"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_tensorflow_p36",
   "language": "python",
   "name": "conda_tensorflow_p36"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
